REST-Java High Level REST Client 文档管理 API

文档操作API

添加文档数据

  • 构建索引请求
    1
    2
    3
    4
    5
    6
    IndexRequest request = new IndexRequest("posts");
    request.id("1"); //文档ID
    //数据
    String jsonString =
    "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}";
    request.source(jsonString, XContentType.JSON);

除了JSON字符串还可使用Map和XContentBuilder提供数据,另外还可使用 Object key-pairs 的方式:source(Object… source)

  • 其他可选参数
  1. 路由设置

    1
    request.routing("routing");//指定分片
  2. 超时时间设置

    1
    2
    3
    4
    //两种方式均可
    //主分片达到可用状态的超时时间
    request.timeout(TimeValue.timeValueSeconds(1));
    request.timeout("1s");
  3. 刷新策略

    1
    2
    3
    4
    //两种方式均可
    //刷新策略不设置默认是NONE,不刷新
    request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
    request.setRefreshPolicy("wait_for");

刷新策略参考:https://blog.csdn.net/hanchao5272/article/details/89151166

  1. 版本
    1
    2
    //常用于并发控制,比如两个人同时修改数据,如何保证自己的操作不会被覆盖?
    request.version(2);
  1. 版本类型

    1
    2
    3
    4
    5
    //两种方式均可
    //设置为CREATE则请求会创建数据,如果已存在则会报错
    //默认属性是DocWriteRequest.OpType.INDEX,存在则更新,不存在则添加
    request.opType(DocWriteRequest.OpType.CREATE);
    request.opType("create");
  2. 设置管道名称

    1
    request.setPipeline("pipeline");
  • 同步调用

    1
    IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
  • 异步调用
    先构建监听器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    listener = new ActionListener<IndexResponse>() {         @Override 
    public void onResponse(IndexResponse indexResponse) {
    //成功的处理逻辑
    }
    @Override
    public void onFailure(Exception e) {
    //失败的处理逻辑
    }
    };

然后调用

1
client.indexAsync(request, RequestOptions.DEFAULT, listener);

  • 响应结果处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    String index = indexResponse.getIndex();
    String id = indexResponse.getId();
    if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
    //当数据是insert时的逻辑
    } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    //当数据是update时的逻辑
    }
    ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
    if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    //当成功的分片数少于总分片数的情况处理
    }
    if (shardInfo.getFailed() > 0) {
    //其他异常处理
    for (ReplicationResponse.ShardInfo.Failure failure :
    shardInfo.getFailures()) {
    String reason = failure.reason();
    }
    }
  • 存在版本冲突报错:ElasticsearchException

1
2
3
4
5
6
7
8
9
10
11
12
IndexRequest request = new IndexRequest("posts")
.id("1")
.source("field", "value")
.setIfSeqNo(10L)
.setIfPrimaryTerm(20);
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {

}
}

版本冲突就会造成报异常,如设置request.opType(DocWriteRequest.OpType.CREATE);后添加了一条已存在的数据。或者设置了setIfSeqNo(10L)等操作